[3-2]订阅数据APP演示
1.定义模型
1.1 定义测点模型:
DataShareYX.json
{
"model": "DataShareYX",
"body": [{
"name": "yc1",
"type": "float",
"unit": "",
"deadzone": "1",
"ratio": "1",
"isReport": "1",
"userdefine": "0,1,0",
"desc": "遥测1",
"data_row": ["single", "single_p"]
},{
"name": "yx1",
"type": "int",
"unit": "",
"deadzone": "1",
"ratio": "1",
"isReport": "1",
"userdefine": "1,1,0",
"desc": "遥信1",
"data_row": ["single", "single_p"]
}
]
}
1.2 定义引用模型
DataShareYXRefTable.json
{
"model_list": [
{
"name": "MultiMeter",
"tag_list": ["PWR_Off_Op_phsA",
"PWR_Off_Op_phsB",
"PWR_Off_Op_phsC",
"PowerOff_Alm",
"PWR_On_Op_phsA",
"PWR_On_Op_phsB",
"PWR_On_Op_phsC",
"PowerOn_Alm"
],
"tag_list_param": []
}
],
"devlist": {
"body": [
{"devNo": "50", "addr": "000000000001", "desc": "", "model": "MultiMeter", "port": "PLC" }
]
}
}
1.3 APP配置文件:
config.json
{
"szBrokerAddr": "tcp://192.168.1.101:1883",
"nDebugLevel": 1
}
2.演示代码
/************************************************************************/
/** \file
* \n 文 件 名 : main.c
* \n 文件功能 : 营销数据共享
* \n 所属模块 :
* \n 作 者 :
* \n 创建时间 : 2021.11.2
* \n 备 注 :
1、引用电表数据
2、监听数据中心入库消息,写入APP共享数据区
3、刷新虚设备实例数据
* \n 历史记录 :
*/
/************************************************************************/
#include "rtdb.h"
#include "public.h"
/************************************************************************/
/* 全局变量 */
/************************************************************************/
#define ID_LINK_NUM 1 //链路号
#define ID_DEV_ADDR 0 //设备内存索引
#define APP_NAME "DataShareYX" //APP名称
#define APP_NAME_MGNT "DataShareYX_MGNT" //APP名称
#define BROKER_ADDRESS "tcp://172.17.0.1:1883" //MQTT Broker IP和端口号 172.17.0.1 192.168.1.101 127.0.0.1
#define DEV_MODEL_NAME "DataShareYX"
#define DEV_MODEL_JSON_FILE "DataShareYX.json"
#define DEV_PORT_NAME "vT"
#define DEV_PORT_ADDR "1"
#define DEV_HDB_JSON_FILE "hdb.json" //历史数据存储
#define DEV_CONFIG_JSON_FILE "config.json" //配置文件
#define DEV_NO_ADC 1 //ADC设备编号
#define DEV_NO_RCD_BEGIN 10 //开关起始设备编号
#define DEV_NO_METER_BEGIN 50 //电表起始设备编号
#define TITLE_PARAMETER_CHANGE "dataCenter/Broadcast/JSON/report/notification/parameter"
#define TITLE_REMOTECTRL "+/Broadcast/JSON/action/request/remoteCtrl"
#define TITLE_DBC_SETVAL "+/dataCenter/JSON/set/request/MultiMeter/#" //目前单个模型范例, 引用多个设备时,增加主题
#define TITLE_RPT_SPONT "+/Broadcast/JSON/report/notification/MultiMeter/#" //目前单个模型范例, 引用多个设备时,增加主题
extern int s_mqtt_topic_eaqul(const char *topicFilter,const char *topicName);//DBC库中内部函数
/************************************************************************/
/* 全局变量 */
/************************************************************************/
TAppConfig appconfig; //APP初始化配置
TAppConfig appconfigMGNT; //APP初始化配置
REGISTER_GUID_INFO_TYPE guidOutput_dev ; //设备GUID获取
TRefDB_V2 dbRef; //引用设备实时库
/************************************************************************/
/* 运算用逻辑变量 */
/************************************************************************/
//! 电表内存变量
typedef struct
{
TRefTagItem* pItem_PowerOff_Alm;//停电
TRefTagItem* pItem_PowerOn_Alm; //上电
}TMeterRam;
//! APP内存
typedef struct
{
cJSON* config_json_data; //配置数据JSON结构体(用于存储本地配置数据)
int bFirstRunFlag; //初次运行标志
//配置参数
char szBrokerAddr[64]; //BROKER地址
int nDebugLevel; //调试打印等级
}TAppRam;
TAppRam g_AppRam;
/************************************************************************/
/* 局部函数 */
/************************************************************************/
static void CreateRefDB( void );
static int UserMqttArrivedUserCb(const char* strTitle, void* hJson);
static void ProcessLogic(int nMsPass);
static void LoadConfig(void);
/************************************************************************/
/* 主入口函数 */
/************************************************************************/
int main(void)
{
int nRet;
int nTickLast;
int nTickLastSpont;
int nTickNow;
//读取配置文件
LoadConfig();
//设置APP配置参数
DBC_SetAppConfigName(&appconfig, g_AppRam.szBrokerAddr, APP_NAME, 0);
//设置IOT配置参数
DBC_SetAppConfigName(&appconfigMGNT, g_AppRam.szBrokerAddr, APP_NAME_MGNT, 0);
//采集APP初始化事件库
RDB_SOE_Init(FALSE);
//初始化应用连接
DBC_mqtt_init(&appconfig);
//初始化消息总线
MQTT_Init(&appconfigMGNT);
//等待连接建立
while(!MQTT_GetConnectionStatus())
{
Osl_Delay(100);
}
//设置全局消息接收回调
MQTT_AddSubscribe(TITLE_DBC_SETVAL, 0); //其他APP入数据中心主题,不用时屏蔽该行
MQTT_AddSubscribe(TITLE_RPT_SPONT, 0); //其他APP主动推送数据变化主题,不用时屏蔽该行
//MQTT_AddSubscribe(TITLE_PARAMETER_CHANGE, 0);//参数变更主题,不用时屏蔽该行
//MQTT_AddSubscribe(TITLE_REMOTECTRL, 0);//遥控消息主题,不用时屏蔽该行
MQTT_SetMqttArrivedUserCb(UserMqttArrivedUserCb);
//创建设备内存
RDB_CreateLocalDB(1);
//注册模型
while( DBC_setmodel_byfile(DEV_MODEL_JSON_FILE, 3000) < 0 )
{
printf("DBC_setmodel_byfile: fail !!! \n");
Osl_Delay(1000);
}
printf("DBC_setmodel_byfile: done. \n");
//注册设备
while( DBC_RegDevInst(DEV_MODEL_NAME, DEV_PORT_NAME, DEV_PORT_ADDR, "",
"DAQO",
1,
&guidOutput_dev ) < 0)
{
printf("DBC_RegDevInst: fail !!! \n" );
Osl_Delay(1000);
}
printf("DBC_RegDevInst: done. \n");
//创建本地测点库、参数库,设备无参数话参数文件名填 NULL
nRet = RDB_CreateDevInst(&g_localDB.m_pDevInstArray[0],
ID_LINK_NUM, ID_DEV_ADDR,
DEV_MODEL_NAME,
guidOutput_dev.dev,
DEV_MODEL_JSON_FILE,
NULL);
if (nRet < 0)
{
printf("CreateDevInst dev Inst device FAIL!!! \n");
while(1)
{
Osl_Delay(100);
}
}
printf("RDB_CreateDevInst: done. \n");
//创建引用数据库(用于逻辑判断)
CreateRefDB();
nTickNow = nTickLast = nTickLastSpont = Osl_GetTickCount();
g_AppRam.bFirstRunFlag = TRUE;
//循环处理
while( 1 )
{
//----------------------------------------------------------------------
// 采集数据(从物理或从引用设备获取输入数据)
//----------------------------------------------------------------------
//进行一次引用设备数据采集
//RDBRef_V2_GetRtValuesAll(&dbRef, 0); //主动从数据中心刷新引用数值值,不用时屏蔽该行
//----------------------------------------------------------------------
// 进行逻辑判断,产生新模型数据
//----------------------------------------------------------------------
nTickNow = Osl_GetTickCount();
ProcessLogic(aabs(nTickNow-nTickLast) * 10 );
nTickLast = nTickNow;
//----------------------------------------------------------------------
// 模型数据刷新
//----------------------------------------------------------------------
//刷新遥测
RTDB_SetYcValueByID(ID_LINK_NUM, ID_DEV_ADDR, 1, 0, 0);
//刷新遥信
RTDB_SetYxValueByID(ID_LINK_NUM, ID_DEV_ADDR, 1, 0, 0);
//变化通知
RDB_NotifyYc(ID_LINK_NUM, ID_DEV_ADDR, TRUE);
RDB_NotifyYx(ID_LINK_NUM, ID_DEV_ADDR, TRUE);
//定时刷实时库(类似总召)
RDB_UpdateYcToDB(ID_LINK_NUM, ID_DEV_ADDR, FLAG_RUN_CHANGED);
RDB_UpdateYxToDB(ID_LINK_NUM, ID_DEV_ADDR, FLAG_RUN_CHANGED);
//延时1S
Osl_Delay(100);
}
return 0;
}
/************************************************************************/
/* 创建引用表数据库 */
/************************************************************************/
static void CreateRefDB( void )
{
int nRet;
int i;
T104DevItem* pDevItem;
TMeterRam* pMeterRam;
//从引用表配置创建内存库
nRet = RDBRef_V2_CreateDBFromFile(&dbRef, GetConfigFilePath(APP_NAME, "DataShareYXRefTable.json" ) );
if (nRet)
{
//刷新未获取设备实例的项目(通过挂接设备方式,通用方式)
RDB104_UpdateDevListDbByVt( &dbRef.m_devList, 3000);
printf("RDBRef_V2_CreateDBFromFile OK.\n");
}
else
{
printf("RDBRef_V2_CreateDBFromFile FAIL!\n");
}
//初始引用设备处理用逻辑变量
for (i=0; i<dbRef.m_devList.m_nDevCount; i++)
{
pDevItem = &dbRef.m_devList.m_pDevItemArray[i];
if (pDevItem->devNo >= DEV_NO_METER_BEGIN)
{
//分配设备运算变量内存
pMeterRam = (TMeterRam*)malloc(sizeof(TMeterRam));
memset(pMeterRam, 0, sizeof(TMeterRam) );
pDevItem->pUserData2=pMeterRam;
//初始化引用变量句柄(用户快速定位)
pMeterRam->pItem_PowerOff_Alm = RDBRef_V2_GetYcItemByTag(&dbRef, pDevItem->devNo, "PowerOff_Alm");
pMeterRam->pItem_PowerOn_Alm = RDBRef_V2_GetYcItemByTag(&dbRef, pDevItem->devNo, "PowerOn_Alm");
}
}
}
//处理共享逻辑
static void ProcessLogic(int nMsPass)
{
//处理APP逻辑
int i;
T104DevItem* pDevItem;
TMeterRam* pMeterRam;
for (i=0; i<dbRef.m_devList.m_nDevCount; i++)
{
pDevItem = &dbRef.m_devList.m_pDevItemArray[i];
//处理电表数据
if (pDevItem->devNo >= DEV_NO_METER_BEGIN)
{
pMeterRam = (TMeterRam*)pDevItem->pUserData2;
//停电上报
if (pMeterRam->pItem_PowerOff_Alm->flag & FLAG_TAG_CHANGED)
{
pMeterRam->pItem_PowerOff_Alm->flag &=~ FLAG_TAG_CHANGED;
printf("PowerOff_Alm = %d \n", RDBRef_V2_GetYcValueInt(pMeterRam->pItem_PowerOff_Alm) );
}
//上电上报
if (pMeterRam->pItem_PowerOn_Alm->flag & FLAG_TAG_CHANGED)
{
pMeterRam->pItem_PowerOn_Alm->flag &=~ FLAG_TAG_CHANGED;
printf("PowerOn_Alm = %d \n", RDBRef_V2_GetYcValueInt(pMeterRam->pItem_PowerOn_Alm) );
}
}
//处理其他设备数据
}
}
//加载配置文件
static void LoadConfig(void)
{
const char* strFullPath = GetConfigFilePath(APP_NAME, DEV_CONFIG_JSON_FILE);
cJSON* config_json_data ;
char* strTmp;
//清空内存
memset(&g_AppRam, 0, sizeof(g_AppRam) );
//读取配置文件
config_json_data = LoadJsonData(strFullPath);
if (config_json_data)
{
strTmp = GetJsonStringByHandleAndTag(config_json_data, "szBrokerAddr");
if (strTmp && strcmp(strTmp,""))
{
strcpy(g_AppRam.szBrokerAddr, strTmp );
}
else
{
strcpy(g_AppRam.szBrokerAddr, BROKER_ADDRESS );
}
g_AppRam.nDebugLevel = GetJsonIntByHandleAndTag(config_json_data, "nDebugLevel");
//释放配置文件资源
//cJSON_Delete(config_json_data);
g_AppRam.config_json_data = config_json_data;
}
else
{
printf("load config : %s FAIL! \n", strFullPath);
//默认配置
strcpy(g_AppRam.szBrokerAddr, BROKER_ADDRESS );
g_AppRam.nDebugLevel = 1;
g_AppRam.config_json_data = cJSON_CreateObject();
cJSON_AddItemToObject(g_AppRam.config_json_data, "szBrokerAddr", cJSON_CreateString(g_AppRam.szBrokerAddr));
cJSON_AddItemToObject(g_AppRam.config_json_data, "nDebugLevel", cJSON_CreateNumber(g_AppRam.nDebugLevel));
}
printf("g_AppRam.szBrokerAddr = %s \n", g_AppRam.szBrokerAddr);
printf("g_AppRam.nDebugLevel = %d \n", g_AppRam.nDebugLevel);
#ifdef WIN32
ILog_Init("./log","log.txt",D_TRACE);
#else
ILog_Init("/data/app/DataShareYX/log","log.txt",g_AppRam.nDebugLevel);
#endif
}
//! MQTT处理函数, 注意该回调函数里面不能立刻回MQTT响应数据
static int UserMqttArrivedUserCb(const char* strTitle, void* hJson)
{
TitleDef theTitle;
cJSON* pJsonData = (cJSON*)hJson;
T104DevItem* pDevItem;
TRefTagItem* pRefTagItem;
int nCount;
int i;
GetTitleDefine(strTitle, &theTitle) ;
//判断主题是否是自己匹配的主题,处理json, 不能释放!
//1. 数据中心数据入库
if ( s_mqtt_topic_eaqul(TITLE_DBC_SETVAL, strTitle))
{
/*
0 1 2 3 4 5 6
{app}/dataCenter/JSON/set/request/{设备类型}/{设备标识}
{
"token": "123",
"timestamp": "2019-03-01T09:30:08.230+0800",
"data_row": "single",
"body": [{
"name": "PhV_phsA",
"val": "220.324",
"quality": "0",
"secret": "1",
"timestamp": "2019-03-01T09:30:08.230+0800"
}
]
}
*/
cJSON* pTokenJson = cJSON_GetObjectItem(pJsonData, "token");
cJSON* pDataJson = cJSON_GetObjectItem(pJsonData, "body");
cJSON* pJson_data_row = cJSON_GetObjectItem(pJsonData, "data_row");
if (pJson_data_row && pTokenJson && pDataJson)
{
//单点值入库
if( strcmp(pJson_data_row->valuestring,"single")==0 )
{
//找到匹配的设备实例
pDevItem = RDB104_FindDevItemByDevInst(&dbRef.m_devList, theTitle.items[6].szTag);
if (pDevItem)
{
//更新设备实例实时数值
nCount = cJSON_GetArraySize(pDataJson);
for (i=0; i<nCount; i++)
{
cJSON* pMeasItem = cJSON_GetArrayItem(pDataJson, i);
cJSON* pNameJson = cJSON_GetObjectItem(pMeasItem, "name");
cJSON* pValueJson = cJSON_GetObjectItem(pMeasItem, "val");
if (pNameJson && pValueJson)
{
pRefTagItem = RDBRef_V2_GetYcItemByTag2(pDevItem, pNameJson->valuestring);
if (pRefTagItem)
{
strcpy(pRefTagItem->val, pValueJson->valuestring);
pRefTagItem->flag |= FLAG_TAG_REFLASHED; //置变更标志
}
}
}
}
}
}
return TRUE;
}
//1.2 主动变化遥信、遥测处理
if ( s_mqtt_topic_eaqul(TITLE_RPT_SPONT, strTitle))
{
/*
{app}/Broadcast/JSON/report/notification/{设备类型}/{设备标识}
{
"token": "123",
"timestamp": "2019-03-01T09: 30: 08.230+0800",
"datatype": "0",
"body": [{
"name": "PhV_phsA",
"id": "1",
"val": "220.324",
"unit": "",
"quality": "0",
"timestamp": "2019-03-01T09: 30: 08.230+0800"
}
]
}
*/
//找到匹配的设备实例
pDevItem = RDB104_FindDevItemByDevInst(&dbRef.m_devList, theTitle.items[6].szTag);
if (pDevItem)
{
cJSON* pTokenJson = cJSON_GetObjectItem(pJsonData, "token");
cJSON* pDatatypeJson = cJSON_GetObjectItem(pJsonData, "datatype");
cJSON* pDataJson = cJSON_GetObjectItem(pJsonData, "body");
if (pDatatypeJson && pTokenJson && pDataJson)
{
int nDataType = atoi(pDatatypeJson->valuestring);
//遥测或遥信主动推送,更新当前测值
if (nDataType==0 || nDataType==1)
{
//更新设备实例实时数值
nCount = cJSON_GetArraySize(pDataJson);
for (i=0; i<nCount; i++)
{
cJSON* pMeasItem = cJSON_GetArrayItem(pDataJson, i);
cJSON* pNameJson = cJSON_GetObjectItem(pMeasItem, "name");
cJSON* pValueJson = cJSON_GetObjectItem(pMeasItem, "val");
if (pNameJson && pValueJson)
{
pRefTagItem = RDBRef_V2_GetYcItemByTag2(pDevItem, pNameJson->valuestring);
if (pRefTagItem)
{
strcpy(pRefTagItem->val, pValueJson->valuestring);
pRefTagItem->flag |= FLAG_TAG_REFLASHED|FLAG_TAG_CHANGED; //置变更标志
}
}
}
}
}
}
}
//2. 参数变更处理
if ( s_mqtt_topic_eaqul(TITLE_PARAMETER_CHANGE, strTitle))
{
/*
dataCenter/Broadcast/JSON/report/notification/parameter
{
"token": "234",
"timestamp": "2019-03-01T09:30:09.230+0800",
"appName": "ADC",
"body": [{
"dev": "MCCB_ guid",
"body": [{
"name": "span",
"val": "5",
"unit": "%",
"datatype": "Int"
},
{
"name": "ratio",
"val": "50",
"unit": "%",
"datatype": "Int"
}
]
}]
}*/
cJSON* pJsonDev = FindJsonByTag(pJsonData, "body[0].dev");
cJSON* pJsonParamArr = FindJsonByTag(pJsonData, "body[0].body");
if (pJsonDev && pJsonParamArr)
{
//异步处理,置更新指定设备参数命令请求(采集线程然后将指定参数修改下发到设备)
if (g_AppRam.bFirstRunFlag)
{
pDevItem = RDB104_FindDevItemByDevInst(&dbRef.m_devList, pJsonDev->valuestring);
if (pDevItem)
{
//更新设备实例参数数值
nCount = cJSON_GetArraySize(pJsonParamArr);
for (i=0; i<nCount; i++)
{
cJSON* pMeasItem = cJSON_GetArrayItem(pJsonParamArr, i);
cJSON* pNameJson = cJSON_GetObjectItem(pMeasItem, "name");
cJSON* pValueJson = cJSON_GetObjectItem(pMeasItem, "val");
if (pNameJson && pValueJson)
{
pRefTagItem = RDBRef_V2_GetParamItemByTag2(pDevItem, pNameJson->valuestring);
if (pRefTagItem)
{
strcpy(pRefTagItem->val, pValueJson->valuestring);
pRefTagItem->flag |= FLAG_TAG_REFLASHED|FLAG_TAG_CHANGED; //置变更标志
}
}
}
}
}
}
return TRUE;
}
//3. 遥控报文范例
if ( s_mqtt_topic_eaqul(TITLE_REMOTECTRL, strTitle))
{
/*
{app1}/Broadcast/JSON/action/request/remoteCtrl
{
"token": "123",
"timestamp": "2019-03-01T09:30:08.230+0800",
"body": [{
"dev": "dev_guid",
"name": "batAct",
"type": "SCO",
"cmd": "1",
"action": "1",
"mode": "0",
"timeout": "10"
}]
}
*/
//解析遥控命令,转发到采集线程
cJSON* pDataJson = FindJsonByTag(pJsonData, "body[0]");
if (pDataJson)
{
/*
dev string 是 设备标识
name string 是 遥控变量名称
type string 是 遥控类型,SCO单点遥信,DCO双点遥信,其它无效。
cmd string 是 0:分闸,1:合闸,其它无效。
action string 是 0:撤销,1:执行,2:预置,其它无效。
mode string 是 0:被控站内部确定;1:短脉冲方式;2:长脉冲方式;3:持续脉冲方式;其它无效。
timeout string 是 超时时间,单位秒(s)。
*/
//主要处理dev、name、cmd、timeout
//保存命令的主题及JSON, 遥控结束时,返回到遥控源发APP
}
return TRUE;
}
return FALSE;
}